iT邦幫忙

2023 iThome 鐵人賽

DAY 13
0
AI & Data

Airflow 是什麼? 能吃嗎 ? 數據水管工的超級蘑菇系列 第 13

[Day13] 零地點突破‧改-Airflow Taskflow API(下)

  • 分享至 

  • xImage
  •  

有鑒於昨天的實作部分,有朋友反應好像講不清楚,一下子就給 code,今天還是再針對每一個 part 來詳細說明~/images/emoticon/emoticon41.gif

一、術式順轉·蒼 - 整體邏輯架構

通常在寫大型專案的時候,我都會先用空的 function 來梳理邏輯,避免過程中把太多功能放到同一個 function,或是寫的過程被中斷,下次回來要花很多時間 review,當然,昨天的code算是很簡單的東西,但是習慣還是從簡單的培養起會比較好~

DAG 圖思考可能的流程架構

https://ithelp.ithome.com.tw/upload/images/20230928/20135427wcQBH5b5Kx.png
可能有些人覺得 DAG 圖是做完才會有,怎麼可能先拿到圖再開始 coding, DAG 圖當然是做完才會有(或是像前幾天用EmptyOperator來做其實也可以有😂),但是DAG 圖在做之前,其實就要自己先設計了,而且資料和程式都是自己寫,當然不會先拿到設計或流程圖啊,資料工程師又不是前端工程師,前端可以看一下 Wireframe 和設計圖去實作,資料工程師必須具備自己造水管的能力,不然身為水管工,不會設計水管架構圖就不合格了/images/emoticon/emoticon15.gif

step1-E: 確定先用 Extract 拿到資料

step2-T: 用 Transform 修改資料,計算出需要的資料

step3-L: Load 將資料送到目的地

當然這比較像是扮家家酒的 ETL,但如果能讓對於這觀念模糊的人更清楚一點點,那就夠了~

像在這部分,我們只需要知道我們會拿到訂單資料,然後處理過後會顯示出平均價格這樣就可以了

我們就可以依照想像中的流程寫出 虛擬碼 (Pseudocode),可能語法有錯都沒關係,重點是架構:

def extract():
    Get data

def transform():
    Do something

def load():
    Show the average price

Python 程式架構

接下來詳細看我們拿到的資料:

[
    {
        "order_id": "1001",
        "order_item": "薯餅蛋餅",
        "order_price": 45
    },
    {
        "order_id": "1002",
        "order_item": "大冰奶",
        "order_price": 35
    }
]

我們發現是 JSON array 格式的,有點類似 Python 的 List,Array 裡面的資料很像 Python 的 Dictionary,我們的需求是要算出訂單的平均價格,所以就要想辦法拿到 Dictionary 裡面的 order_price。

Extract 的部分:要用json模組把資料讀進來

import json
def extract(json_data):
    data = json.load(json_data)
    return data

Transform 的部分:要用 Loop 和 len() 計算 Price 加總和訂單數,才能算出平均

def transform(order_data):
    order_total = 0 # 加總後的結果變數
    for order_dict in order_data:
        order_total += order_dict['order_price'] # 用+=把每筆order的價格加到order_total
    order_count = len(order_data) # len直接拿到 order_data_json的數量
    order_average = order_total/order_count # 總價格/數量=平均價格
    return order_average

突然,我們發現這隻 Transform 不符合 function SOLID 的單一職責原則(Single Responsibility Principle),所以我們決定把 function 拆開(哈哈哈硬要誒/images/emoticon/emoticon01.gif)

def transform_sum(order_data_json):
    order_total = 0
    for order_dict in order_data_json:
        order_total += order_dict['order_price']
    return order_total

def transform_count(order_data_json):
    order_count = len(order_data_json)
    return order_count

def transform_averge(order_total, order_count):
    order_average = order_total/order_count
    return order_average

每個function只做一件事了,耶~~

為了等等能用 TaskGroup,只能忍耐了/images/emoticon/emoticon04.gif

Load 部分: print顯示出來,超簡單

def load(order_average):
    print(f"Average Order Price: {order_average}")

二、術式反轉·赫 - Airflow 語法細節

@dag

@dag(schedule_interval=None, start_date=datetime(2023, 9, 28))
def taskflow_etl_dag():
    pass

可以直接用一行去設定dag相關資訊,開始時間、排程、名稱等等,用 @dag 會預設 function 名稱就是 dag_id ,不用自己設定,也不用怕不小心設定不一樣,真是太好了~

@task

@task()
def extract(json_data):
    data = json.load(json_data)
    return data

直接在前面加上 @task,這個 python function 就會變成 pythonOperator 的 task 了,可以在後面括號設定細節,不設定會用 function 名稱就是 task_id

@task_group

@task_group 
def transform(order_data):

task_group的部分是可以把多個task放在一起,詳細介紹可以看前幾天的貼文,@task_group 也是一樣在function 前面加上就可以了

三、虛式·茈 - 合併成專案

還記得嗎?虛式·茈就是蒼加上赫,誒~只有我有看嗎/images/emoticon/emoticon01.gif,反正了解了邏輯架構和語法細節之後,只要把語法放到架構裡面去寫,基本上就能順利完成,但大多時候都不一定順利

融合

@dag(schedule_interval=None, start_date=datetime(2023, 9, 27))
def taskflow_etl_dag():
    @task()
    def extract():
        xxx
    @task_group
    def transform(order_data):
        @task()
        def transform_sum(order_data_json):
            xxx
        @task()
        def transform_count(order_data_json):
            xxx
        @task()
        def transform_averge(order_total, order_count):
            xxx
        order_average_result = transform_averge(
            transform_sum(order_data), transform_count(order_data)) 
        return order_average_result #要執行並回傳
    @task()
    def load(order_average):
        xxx
    load(transform(extract()))
taskflow_etl_dag()
  • 設定 task_group 當中的相依性
    order_average_result = transform_averge(transform_sum(order_data),transform_count(order_data))
    這就像之前學的
    [transform_sum,transform_count] >> transform_averge
    但是用taskflow api 就是要用 function return 來做設定

  • 設定 整體的順序:load(transform(extract())) 等同於之前的 extract >> transform >> load

這當中最需要注意的是在 task_group 當中的 task 盡量要在裡面執行並回傳,邏輯上會比較清晰,如果要在外面使用,代表其實就要重新思考這個 task 應不應該放在 task_group 當中。

完整的 code 請回昨天查看喔!!

四、五條老師RIP - 結語

五條老師不要走~今天用了另一種方式說明和昨天相同的 code,希望大家看完能夠清楚,明天真的要用docker了~預祝中秋節快樂,放完假胖5公斤/images/emoticon/emoticon71.gif


上一篇
[Day12] 零地點突破‧改-Airflow Taskflow API(上)
下一篇
[Day14] 吃了容器果實的鯨魚拉布-Docker(1)
系列文
Airflow 是什麼? 能吃嗎 ? 數據水管工的超級蘑菇30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言